Tennischläge klassifizieren - Computer Vision Projekt¶
Problemstellung¶
Bei diesem Projekt soll mithilfe von Computer Vision zwischen folgenden Tennisschlägen unterschieden werden: Vorhand, Rückhand und Aufschlag.
Dabei soll das trainierte Modell Muster erkennen, die auch auf einen neuen Datensatz angewendet werden können.
Für den ersten Datensatz haben wir einen Artikel gefunden, in dem bereits 500 Bilder pro Schlag klassifiziert wurden. Der Titel des Artikels ist "Motion Analysis of Tennis Strokes Using Pose Estimation".
Link zum Artikel:
https://www.sciencedirect.com/science/article/pii/S2352340924006322Für den zweiten Datensatz haben wir selbst Bilder erhoben, indem wir diese Schläge gefilmt und anschliessend in Bilder umgewandelt haben.
Das Ziel dieses Tests ist es, ein Modell zu entwickeln, das nicht nur eine hohe Testgenauigkeit innerhalb eines Datensatzes erreicht, sondern auch über mehrere Datensätze hinweg Muster erkennen kann.
Bilder vorbereiten - Zuschneiden (cropping)¶
Visualisierung der Bilder¶
import os
import matplotlib.pyplot as plt
from PIL import Image
# Get the current working directory
base_path = os.getcwd()
# Define the relative path to the folder containing the categories
parent_folder_path = "/teamspace/studios/this_studio/final_folder_testing/original_pictures"
# Define the categories
categories = ["forehand", "backhand", "serve"]
# Function to display an image from each category
def show_images_from_categories(base_path, categories, parent_folder_path):
fig, axes = plt.subplots(1, len(categories), figsize=(15, 5))
for ax, category in zip(axes, categories):
category_path = os.path.join(parent_folder_path, category)
# Select an image from the category
image_files = [f for f in os.listdir(category_path) if f.lower().endswith(('png', 'jpg', 'jpeg'))]
if image_files:
# Select the first image
image_path = os.path.join(category_path, image_files[0])
img = Image.open(image_path)
ax.imshow(img)
ax.set_title(category)
ax.axis("off")
else:
ax.set_title(f"No images in '{category}'")
ax.axis("off")
plt.tight_layout()
plt.show()
# Display images
show_images_from_categories(base_path, categories, parent_folder_path)
YOLOv5 für Image Cropping¶
- YOLOv5 erkennt Personen in Tennisbildern.
- Es identifiziert die Begrenzungsrahmen um die Spieler.
- Anschliessend werden die Begrenzungsrahmen mit einem Rand zugeschnitten und auf eine einheitliche Zielgrösse skaliert.
def crop_and_resize_image(image_path: str, output_folder: str, base_folder: str, margin=0.5, target_size=(224, 224)):
try:
# Derive the relative path to retain folder structure
relative_path = Path(image_path).relative_to(base_folder) # e.g., backhand/B_001.jpeg
new_file_path = Path(output_folder) / relative_path # e.g., cropped_original/original_pictures/backhand/B_001.jpeg
# Skip if the file already exists
if new_file_path.exists():
return True
# Load the image
image = Image.open(image_path)
results = model(image_path) # Run YOLO detection on the image
# Filter detections for 'person' class
person_detections = [det for det in results.xyxy[0].tolist() if int(det[5]) == 0] # Class '0' is 'person'
if not person_detections:
print(f"No person detected in {image_path}")
return False
# Find the largest bounding box for the closest person
closest_person = max(person_detections, key=lambda det: (det[2] - det[0]) * (det[3] - det[1]))
# Extract bounding box coordinates
xmin, ymin, xmax, ymax = map(int, closest_person[:4])
# Calculate margin for consistent cropping
img_width, img_height = image.size
box_width = xmax - xmin
box_height = ymax - ymin
# Apply the margin to create a similar crop
xmin = max(0, xmin - int(box_width * margin))
ymin = max(0, ymin - int(box_height * margin))
xmax = min(img_width, xmax + int(box_width * margin))
ymax = min(img_height, ymax + int(box_height * margin))
# Crop the image around the player
cropped_image = image.crop((xmin, ymin, xmax, ymax))
# Resize to a fixed target size for consistency
resized_image = cropped_image.resize(target_size)
new_file_path.parent.mkdir(parents=True, exist_ok=True)
# Save the resized and cropped image
resized_image.save(new_file_path)
print(f"Saved cropped and resized image to {new_file_path}")
except Exception as e:
print(f"Error processing {image_path}: {e}")
return False
return True
Visualisierung der Bilder nach dem "cropping"¶
%matplotlib inline
import os
from PIL import Image
import matplotlib.pyplot as plt
# Define paths and categories
base_path = '/teamspace/studios/this_studio/final_folder/cropped/original_pictures'
categories = ['backhand', 'forehand', 'serve']
# Visualize a few samples from each category
def visualize_samples(base_path, categories):
fig, axes = plt.subplots(1, len(categories), figsize=(15, 5))
for i, category in enumerate(categories):
folder_path = os.path.join(base_path, category)
sample_image = os.listdir(folder_path)[0] # Take the first image from each category
img_path = os.path.join(folder_path, sample_image)
image = Image.open(img_path)
axes[i].imshow(image)
axes[i].set_title(category)
axes[i].axis("off")
plt.tight_layout()
plt.show()
visualize_samples(base_path, categories)
Transformieren und Splitting¶
- Die Transformationen wie Resize und Normalize bereiten die Bilder für ein Modell (in unserem Fall ResNet) vor, indem sie in ein einheitliches Format gebracht werden.
- Die Bilder werden zunächst skaliert, normalisiert und in Tensoren umgewandelt.
- Die Labels basieren auf den drei Kategorien (Vorhand, Rückhand, Aufschlag) und werden in Trainings, Validierungs- und Test-Sets aufgeteilt.
- Dank train_test_split wird eine stratifizierte Aufteilung durchgeführt, um die Klassenverteilung in jedem Datensatz beizubehalten.
from sklearn.model_selection import train_test_split
from torchvision import transforms
# Transformation pipeline
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Load all image paths and labels
image_paths = []
labels = []
category_to_label = {category: idx for idx, category in enumerate(categories)}
for category in categories:
folder_path = os.path.join(base_path, category)
for file in os.listdir(folder_path):
if file.endswith('.jpeg'):
image_paths.append(os.path.join(folder_path, file))
labels.append(category_to_label[category])
# Split data into train, validation, and test
train_paths, temp_paths, train_labels, temp_labels = train_test_split(
image_paths, labels, test_size=0.3, stratify=labels, random_state=42
)
val_paths, test_paths, val_labels, test_labels = train_test_split(
temp_paths, temp_labels, test_size=0.5, stratify=temp_labels, random_state=42
)
print(f"Train size: {len(train_paths)}, Validation size: {len(val_paths)}, Test size: {len(test_paths)}")
Train size: 1050, Validation size: 225, Test size: 225
Visualisierung der transformierten Bilder¶
- Skalierung (
Resize): Bilder werden auf eine einheitliche Größe von (224, 224) gebracht, um mit Modellen wie ResNet kompatibel zu sein. - Umwandlung in Tensoren (
ToTensor): Bilder werden in PyTorch-Tensoren umgewandelt und Pixelwerte auf den Bereich [0, 1] normalisiert. - Normalisierung (
Normalize): Farbkanäle der Bilder werden mit den Mittelwerten [0.485, 0.456, 0.406] und Standardabweichungen [0.229, 0.224, 0.225] standardisiert, um die Stabilität und Leistung zu verbessern.
from torch.utils.data import Dataset
import torch
class TennisDataset(Dataset):
def __init__(self, image_paths, labels, transform=None):
self.image_paths = image_paths
self.labels = labels
self.transform = transform
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
img_path = self.image_paths[idx]
label = self.labels[idx]
image = Image.open(img_path).convert("RGB")
if self.transform:
image = self.transform(image)
return image, label
# Visualize transformed images
def visualize_transformed_images(dataset, categories):
fig, axes = plt.subplots(1, 5, figsize=(15, 5))
for i in range(5):
image, label = dataset[i]
axes[i].imshow(image.permute(1, 2, 0)) # Convert CHW to HWC
axes[i].set_title(categories[label])
axes[i].axis("off")
plt.tight_layout()
plt.show()
train_dataset = TennisDataset(train_paths, train_labels, transform)
visualize_transformed_images(train_dataset, categories)
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers). Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers). Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers). Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers). Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
Modellaufbau¶
- Ein vortrainiertes ResNet-18-Modell wird verwendet, um die Klassifikation der Tennisschläge durchzuführen. ResNet ist der Standard für Bildklassifikationen und eignet sich daher gut für diese Aufgabe. Modelle mit mehr Schichten, wie ResNet-50, wären für diese Aufgabe bereits zu komplex, während ein einfaches CNN-Netzwerk nicht komplex genug wäre.
- Die letzte Schicht des Modells (
fc) wird angepasst, um die Zielklassen (forehand,backhand,serve) zu unterstützen.
Datenaufbereitung¶
- Der Datensatz wird in Trainings, Validierungs- und Testsätze unterteilt.
- Die Daten werden mithilfe des
DataLoaderfür das Training, die Validierung und das Testen bereitgestellt.
Modellkonfiguration¶
- Loss-Funktion:
CrossEntropyLosswird verwendet, da sie für mehrklassige Klassifikationsprobleme geeignet ist. - Optimierer:
Adammit einer Lernrate von 0.001 sorgt für effizientes Training. - Testsatz: Ein Testsatz wird bereitgestellt, um die abschließende Bewertung des Modells auf ungesehenen Daten durchzuführen.
from torchvision import models
import torch.nn as nn
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
# Define the model, loss function, and optimizer
model = models.resnet18(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, len(categories)) # Adjust output layer for categories
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Split data into train, validation, and test sets
train_paths, temp_paths, train_labels, temp_labels = train_test_split(
image_paths, labels, test_size=0.3, stratify=labels, random_state=42
)
val_paths, test_paths, val_labels, test_labels = train_test_split(
temp_paths, temp_labels, test_size=0.5, stratify=temp_labels, random_state=42
)
# Create datasets
train_dataset = TennisDataset(train_paths, train_labels, transform)
val_dataset = TennisDataset(val_paths, val_labels, transform)
test_dataset = TennisDataset(test_paths, test_labels, transform) # Added test dataset
# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False) # Added test DataLoader
print("Model, datasets, and DataLoaders initialized successfully!")
/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torchvision/models/_utils.py:208: UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead. warnings.warn( /home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torchvision/models/_utils.py:223: UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. The current behavior is equivalent to passing `weights=ResNet18_Weights.IMAGENET1K_V1`. You can also use `weights=ResNet18_Weights.DEFAULT` to get the most up-to-date weights. warnings.warn(msg)
Model, datasets, and DataLoaders initialized successfully!
Trainings- und Validierungsprozess mit Early Stopping¶
Initialisierung von Metriken: Listen für
train_loss,val_loss,train_accundval_accspeichern die Verlaufsdaten über die Epochen.Training:
- Trainingsphase:
- Modell wird in den Trainingsmodus gesetzt (
model.train()). - Schritte pro Batch:
- Gradienten werden zurückgesetzt (
optimizer.zero_grad()). - Eingaben werden durch das Modell geleitet, und der Verlust wird berechnet.
- Gradienten werden berechnet (
loss.backward()) und der Optimierer aktualisiert die Modellgewichte (optimizer.step()).
- Gradienten werden zurückgesetzt (
- Laufende Berechnung von Trainingsverlust und -genauigkeit.
- Modell wird in den Trainingsmodus gesetzt (
- Trainingsphase:
Validierung:
- Validierungsphase:
- Modell wird in den Evaluierungsmodus gesetzt (
model.eval()). - Keine Gradientenberechnung (
torch.no_grad()). - Laufende Berechnung von Validierungsverlust und -genauigkeit.
- Modell wird in den Evaluierungsmodus gesetzt (
- Validierungsphase:
Early Stopping:
- Überprüfung, ob der Validierungsverlust verbessert wird.
- Falls keine Verbesserung über
early_stop_patienceEpochen, wird das Training gestoppt.
Ergebnisse pro Epoche:
- Trainings- und Validierungsmetriken (Verlust und Genauigkeit) werden am Ende jeder Epoche ausgegeben.
# Initialize lists to store metrics
train_loss = []
val_loss = []
train_acc = []
val_acc = []
epochs = 10
early_stop_patience = 3 # Stop after 3 consecutive epochs without improvement
best_val_loss = float('inf')
early_stop_counter = 0
for epoch in range(epochs):
# Training phase
model.train()
running_loss = 0.0
correct_train = 0
total_train = 0
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
# Calculate training accuracy
_, predicted = torch.max(outputs, 1)
total_train += labels.size(0)
correct_train += (predicted == labels).sum().item()
# Store training metrics
train_loss.append(running_loss / len(train_loader))
train_acc.append(100 * correct_train / total_train)
# Validation phase
model.eval()
running_val_loss = 0.0
correct_val = 0
total_val = 0
with torch.no_grad():
for inputs, labels in val_loader:
outputs = model(inputs)
loss = criterion(outputs, labels)
running_val_loss += loss.item()
# Calculate validation accuracy
_, predicted = torch.max(outputs, 1)
total_val += labels.size(0)
correct_val += (predicted == labels).sum().item()
# Store validation metrics
val_loss.append(running_val_loss / len(val_loader))
val_acc.append(100 * correct_val / total_val)
# Check for early stopping
if val_loss[-1] < best_val_loss:
best_val_loss = val_loss[-1]
early_stop_counter = 0
else:
early_stop_counter += 1
if early_stop_counter >= early_stop_patience:
print(f"Early stopping triggered at epoch {epoch + 1}")
break
# Print metrics for the epoch
print(f"Epoch {epoch + 1}/{epochs}")
print(f"Train Loss: {train_loss[-1]:.4f}, Train Accuracy: {train_acc[-1]:.2f}%")
print(f"Validation Loss: {val_loss[-1]:.4f}, Validation Accuracy: {val_acc[-1]:.2f}%")
Epoch 1/10 Train Loss: 0.3348, Train Accuracy: 87.43% Validation Loss: 1.6032, Validation Accuracy: 67.11% Epoch 2/10 Train Loss: 0.1306, Train Accuracy: 95.90% Validation Loss: 0.0959, Validation Accuracy: 94.67% Epoch 3/10 Train Loss: 0.0461, Train Accuracy: 98.29% Validation Loss: 0.1900, Validation Accuracy: 92.44% Epoch 4/10 Train Loss: 0.0335, Train Accuracy: 98.95% Validation Loss: 0.1395, Validation Accuracy: 96.44% Early stopping triggered at epoch 5
Ergebnis: Eine Validierungsgenauigkeit von 96.44 % konnte erreicht werden. Wir prüfen nun im Testergebnis, ob dies auch tatsächlich auf ungesehenen Daten funktioniert oder ob das Modell overfitted ist.
# Save the trained model
model_save_path = "/teamspace/studios/this_studio/final_folder_testing/trained_tennis_model.pth" # Hardcoded path
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")
Model saved to /teamspace/studios/this_studio/final_folder_testing/trained_tennis_model.pth
Model wir gespeichert, damit dies später nochmals einfacher verwendet werden kann.
Modellbewertung¶
- Das Modell wird mit den Testdaten evaluiert, um die Leistung auf neuen, ungesehenen Daten zu beurteilen.
- Die Ergebnisse werden mithilfe einer Konfusionsmatrix visualisiert, um die Qualität der Klassifikation detailliert zu bewerten.
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
# Test evaluation
model.eval()
test_labels = []
test_preds = []
with torch.no_grad():
for inputs, labels in test_loader: # Use test_loader here
outputs = model(inputs)
_, predicted = torch.max(outputs, 1)
test_labels.extend(labels.numpy())
test_preds.extend(predicted.numpy())
# Calculate test accuracy
correct = sum([1 for true, pred in zip(test_labels, test_preds) if true == pred])
total = len(test_labels)
test_accuracy = 100 * correct / total
print(f"Test Accuracy: {test_accuracy:.2f}%")
# Confusion matrix for test set
test_cm = confusion_matrix(test_labels, test_preds)
# Heatmap with blue-red colormap
sns.heatmap(test_cm, annot=True, fmt="d", xticklabels=categories, yticklabels=categories, cmap="coolwarm")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Test Confusion Matrix")
plt.show()
# Classification report for test set
print("Test Set Classification Report")
print(classification_report(test_labels, test_preds, target_names=categories))
Test Accuracy: 96.89%
Test Set Classification Report
precision recall f1-score support
backhand 0.99 0.96 0.97 75
forehand 0.93 1.00 0.96 75
serve 1.00 0.95 0.97 75
accuracy 0.97 225
macro avg 0.97 0.97 0.97 225
weighted avg 0.97 0.97 0.97 225
Konklusion der Testmatrix und Ergebnisse¶
Die Vorhersage des Modells ist mit 97% insgesamt sehr gut.
Nur einige Vorhand- und Rückhand-Schläge wurden nicht korrekt klassifiziert.
Modellleistung: Das Modell zeigte bereits sehr gute Ergebnisse mit nur wenigen Epochen, weshalb der Fokus auf dem Hinzufügen und Vergleichen neuer Bilddateien liegt.
Kein Finetuning: Auf das
FinetuningdesResNet-Modellswurde verzichtet, da bereits schnell eine hoheGenauigkeiterreicht wurde. Stattdessen wird geprüft, wie gut das Modell aufneue Datengeneralisiert. Mein ProjektpartnerSimon Bieriführt parallelFinetuningdurch.Ziel: Überprüfung, ob das Modell, das auf den originalen Daten trainiert wurde, auch auf
neue Datenzuverlässig generalisiert.
Hinzufügen von mehr Daten (eigene Daten)¶
- Nächste Schritte: Im nächsten Abschnitt wird beschrieben, wie die neuen Bilddateien hinzugefügt wurden.
Dieser Code extrahiert Frames aus den angegebenen Video-Dateien (backhand.MOV, forehand.MOV, serve.MOV) und speichert sie in den entsprechenden Ordnern innerhalb des new_data-Verzeichnisses. Die Frames werden in regelmäßigen Abständen (alle 8 Frames, was sich als ideal erwiesen hat) gespeichert, wobei die ursprüngliche Ordnerstruktur beibehalten wird.
import os
import cv2
from pathlib import Path
# Absolute paths to video files
video_paths = {
'backhand.MOV': '/teamspace/studios/this_studio/final_folder_testing/backhand.MOV',
'forehand.MOV': '/teamspace/studios/this_studio/final_folder_testing/forehand.MOV',
'serve.MOV': '/teamspace/studios/this_studio/final_folder_testing/serve.MOV'
}
# Base path for the new data folder (relative to the notebook directory or absolute URL path)
new_data_base_path = '/teamspace/studios/this_studio/final_folder_testing/new_data'
# Create directories to save frames in the new_data folder
output_folders = {
'backhand_frames': Path(new_data_base_path) / 'backhand_frames',
'forehand_frames': Path(new_data_base_path) / 'forehand_frames',
'serve_frames': Path(new_data_base_path) / 'serve_frames'
}
# Create the directories if they don't exist
for folder in output_folders.values():
os.makedirs(folder, exist_ok=True)
# Function to extract frames from a video
def extract_frames(video_path, output_folder, frame_interval=8):
if not os.path.exists(video_path):
print(f"Error: Video file {video_path} does not exist.")
return
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Unable to open video file {video_path}.")
return
frame_count = 0
saved_frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Save frames at specific intervals
if frame_count % frame_interval == 0:
frame_filename = output_folder / f'frame_{saved_frame_count:04d}.jpg'
# Skip saving if the file already exists
if not frame_filename.exists():
cv2.imwrite(str(frame_filename), frame)
saved_frame_count += 1
frame_count += 1
cap.release()
# Extract frames for each video
for video_name, video_path in video_paths.items():
output_folder = output_folders[video_name.split('.')[0] + '_frames']
print(f"Processing {video_name}...")
extract_frames(video_path, output_folder)
print("Frame extraction completed. Frames are stored in 'new_data'.")
Processing backhand.MOV... Processing forehand.MOV... Processing serve.MOV... Frame extraction completed. Frames are stored in 'new_data'.
Verarbeitung von Videoaufnahmen¶
- Wir verarbeiten Videoaufnahmen für verschiedene Tennisschlagarten:
Vorhand,RückhandundAufschlag.
- Die Schlagarten werden in einem separaten Textfile gelabelt, um die Datenstruktur klar zu definieren.
- Zur Beschleunigung des Prozesses verwenden wir Sequenzen von 5 Bildern, um Informationen effizient zu extrahieren und zu speichern.
import os
import shutil
from pathlib import Path
import logging
from logging import StreamHandler
# Configure logging
log_level = logging.INFO
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
logging_formatter = logging.Formatter(fmt="%(asctime)s %(levelname)-8s %(name)-15s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
stream_handler = StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(logging_formatter)
logger.addHandler(stream_handler)
####################### INPUT DEFINITIONS ##############################################################################
# Base folder paths (updated to the new directory)
base_folder = Path("/teamspace/studios/this_studio/final_folder_testing")
image_source_paths = {
"forehand": base_folder.joinpath("new_data/forehand_frames"),
"backhand": base_folder.joinpath("new_data/backhand_frames"),
"serve": base_folder.joinpath("new_data/serve_frames")
}
image_target_path = base_folder.joinpath("processed_images")
index_file_map = {
"forehand": base_folder.joinpath("F_indexes.txt"),
"backhand": base_folder.joinpath("B_indexes.txt"),
"serve": base_folder.joinpath("S_indexes.txt"),
}
# General settings
num_files_per_sequence = 5
##################### END INPUT DEFINITIONS ############################################################################
# Main logic for processing all hit types
logger.info("Start script")
try:
for hit_type, source_path in image_source_paths.items():
# Validate hit_type
if hit_type not in index_file_map:
logger.warning(f"No index file found for {hit_type}. Skipping...")
continue
# File prefix based on hit_type
file_prefix = hit_type[0].upper()
# Load indexes
index_file_path = index_file_map[hit_type]
if not index_file_path.exists():
logger.warning(f"Index file {index_file_path} does not exist. Skipping {hit_type}...")
continue
with open(index_file_path, "r") as f:
lines = f.readlines()
indexes = [int(x.strip("\n")) for x in lines if len(x.strip("\n")) > 0]
# Ensure the target directory exists
target_path = image_target_path.joinpath(hit_type)
os.makedirs(target_path, exist_ok=True)
# Process frames
for sequence_index, file_index in enumerate(indexes):
for seq_file_index, i in enumerate(range(file_index - num_files_per_sequence + 1, file_index + 1)):
# Adjusted to match your filename pattern (e.g., frame_XXXX.jpg)
source_file = source_path.joinpath(f"frame_{i:04}.jpg")
target_file = target_path.joinpath(f"{file_prefix}_{sequence_index:03}_{seq_file_index:03}.jpeg")
if source_file.exists():
shutil.copyfile(source_file, target_file)
else:
logger.warning(f"Source file {source_file} does not exist and was skipped.")
logger.info(f"Finished processing {hit_type}. {len(indexes)} sequences copied to {target_path}")
except Exception as e:
logger.exception("Exception occurred", exc_info=True)
finally:
logger.info("Script completed.")
2024-11-27 06:35:53 INFO __main__ Start script 2024-11-27 06:35:53 INFO __main__ Finished processing forehand. 27 sequences copied to /teamspace/studios/this_studio/final_folder_testing/processed_images/forehand 2024-11-27 06:35:53 INFO __main__ Finished processing backhand. 51 sequences copied to /teamspace/studios/this_studio/final_folder_testing/processed_images/backhand 2024-11-27 06:35:53 INFO __main__ Finished processing serve. 43 sequences copied to /teamspace/studios/this_studio/final_folder_testing/processed_images/serve 2024-11-27 06:35:53 INFO __main__ Script completed.
Visualisierung der Sequenz¶
import matplotlib.pyplot as plt
from pathlib import Path
# Function to visualize a sequence of 5 images
def visualize_sequence(sequence_folder, sequence_prefix, num_images=5):
images = sorted(sequence_folder.glob(f"{sequence_prefix}_*.jpeg"))
if len(images) < num_images:
print(f"Not enough images in {sequence_folder} for sequence {sequence_prefix}")
return
fig, axes = plt.subplots(1, num_images, figsize=(15, 5))
for i, image_path in enumerate(images[:num_images]):
img = plt.imread(image_path)
axes[i].imshow(img)
axes[i].set_title(image_path.name)
axes[i].axis("off")
plt.tight_layout()
plt.show()
# Example usage with relative paths based on current working directory
current_dir = Path.cwd() # Assuming script is running from the base folder location
processed_folder = current_dir / "final_folder_testing/processed_images" # Updated path
hit_type = "forehand" # Change to "backhand" or "serve" as needed
sequence_folder = processed_folder / hit_type # Path to the folder containing sequences
sequence_prefix = "F_000" # Adjust the prefix to match a specific sequence
# Display the first sequence
visualize_sequence(sequence_folder, sequence_prefix)
- In einem separaten Textfile erfolgt die letzte Labelung, kurz vor der Abgabe des Projekts.
- Aus jeder Sequenz werden 5 Bilder ausgewählt, um nur relevante Bilder für die Verarbeitung und Analyse zu extrahieren.
Cropping¶
Im nächsten Schritt werden, wie zu Beginn bei den originalen Bildern, mithilfe von YOLOv5 die Fotos mit Bounding Boxes ausgeschnitten. Dies ermöglicht einen Vergleich zwischen verschiedenen Datensätzen.
import torch
from pathlib import Path
from PIL import Image
import os
# Load the YOLOv5 model
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
def crop_and_resize_image(image_path: str, output_folder: str, base_folder: str, margin=0.5, target_size=(224, 224)):
try:
# Derive the relative path to retain folder structure
relative_path = Path(image_path).relative_to(base_folder) # e.g., backhand/B_001.jpeg
new_file_path = Path(output_folder) / relative_path # e.g., cropped_original/original_pictures/backhand/B_001.jpeg
# Skip if the file already exists
if new_file_path.exists():
return True
# Load the image
image = Image.open(image_path)
results = model(image_path) # Run YOLO detection on the image
# Filter detections for 'person' class
person_detections = [det for det in results.xyxy[0].tolist() if int(det[5]) == 0] # Class '0' is 'person'
if not person_detections:
return False
# Find the largest bounding box for the closest person
closest_person = max(person_detections, key=lambda det: (det[2] - det[0]) * (det[3] - det[1]))
# Extract bounding box coordinates
xmin, ymin, xmax, ymax = map(int, closest_person[:4])
# Calculate margin for consistent cropping
img_width, img_height = image.size
box_width = xmax - xmin
box_height = ymax - ymin
# Apply the margin to create a similar crop
xmin = max(0, xmin - int(box_width * margin))
ymin = max(0, ymin - int(box_height * margin))
xmax = min(img_width, xmax + int(box_width * margin))
ymax = min(img_height, ymax + int(box_height * margin))
# Crop the image around the player
cropped_image = image.crop((xmin, ymin, xmax, ymax))
# Resize to a fixed target size for consistency
resized_image = cropped_image.resize(target_size)
# Ensure the output folder exists
new_file_path.parent.mkdir(parents=True, exist_ok=True)
# Save the resized and cropped image
resized_image.save(new_file_path)
except Exception as e:
print(f"Error processing {image_path}: {e}")
return False
return True
# Define paths for the dataset (relative paths)
datasets = {
"original_pictures": "/teamspace/studios/this_studio/final_folder_testing/processed_images", # Full path
}
# Output base path (new folder: cropped_original)
output_base_path = "/teamspace/studios/this_studio/final_folder_testing/cropped_processed" # Specify the new folder here
# Process each dataset with consistent cropping and resizing
failed_paths = []
for dataset_name, dataset_path in datasets.items():
# Get all image files recursively within the current directory
all_file_paths = list(Path(dataset_path).rglob("*.jpeg")) # Get all JPEG files recursively
# Specify the correct output subpath (relative path)
dataset_output_path = Path(output_base_path) / dataset_name # e.g., cropped_original/original_pictures
for file_path in all_file_paths:
success = crop_and_resize_image(
image_path=file_path.as_posix(),
output_folder=dataset_output_path.as_posix(),
base_folder=dataset_path,
margin=0.5,
target_size=(500, 500)
)
if not success:
failed_paths.append(file_path)
# Output any failed images
if failed_paths:
print("Failed Images:")
for failed_path in failed_paths:
print(failed_path)
Using cache found in /home/zeus/.cache/torch/hub/ultralytics_yolov5_master YOLOv5 🚀 2024-11-12 Python-3.10.10 torch-2.2.1+cu121 CPU Fusing layers... YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs Adding AutoShape...
Dieser Code verschiebt Bilddateien aus einem Quellordner (original_pictures) in neue Zielordner, die basierend auf den Kategorien (backhand, forehand, serve) organisiert sind. Fehlende Zielordner werden automatisch erstellt. Abschliessend werden leere Quellordner gelöscht, falls keine Dateien mehr enthalten sind.
import os
from pathlib import Path
import shutil
# Pfad zum aktuellen Verzeichnis mit 'original_pictures'
source_base_path = Path("/teamspace/studios/this_studio/final_folder_testing/cropped_processed/original_pictures")
# Neuer Basispfad ohne 'original_pictures'
target_base_path = Path("/teamspace/studios/this_studio/final_folder_testing/cropped_processed")
# Alle Kategorien (Ordner wie 'backhand', 'forehand', 'serve')
categories = ["backhand", "forehand", "serve"]
# Bilder verschieben
if source_base_path.exists():
for category in categories:
source_path = source_base_path / category
target_path = target_base_path / category
# Skip if source path does not exist
if not source_path.exists():
continue
# Zielordner erstellen, falls nicht vorhanden
os.makedirs(target_path, exist_ok=True)
# Alle Dateien im Quellordner verschieben
for file in source_path.iterdir():
if file.is_file() and file.suffix.lower() in [".jpeg", ".jpg", ".png"]: # Nur Bilddateien verschieben
target_file = target_path / file.name
shutil.move(str(file), str(target_file)) # Datei verschieben
# Optional: Leere Verzeichnisse entfernen
for category in categories:
source_path = source_base_path / category
if source_path.exists() and not any(source_path.iterdir()):
source_path.rmdir()
else:
print(f"Source base path does not exist: {source_base_path}. Nothing to do.")
Visualisierung der gecroppten Bilder vom eigenen Datensatz¶
import os
from PIL import Image
import matplotlib.pyplot as plt
# Define paths and categories
base_path = '/teamspace/studios/this_studio/final_folder_testing/cropped_processed'
categories = ['backhand', 'forehand', 'serve']
# Visualize a few samples from each category
def visualize_samples(base_path, categories):
fig, axes = plt.subplots(1, len(categories), figsize=(15, 5))
for i, category in enumerate(categories):
folder_path = os.path.join(base_path, category)
sample_image = os.listdir(folder_path)[0] # Take the first image from each category
img_path = os.path.join(folder_path, sample_image)
image = Image.open(img_path)
axes[i].imshow(image)
axes[i].set_title(category)
axes[i].axis("off")
plt.tight_layout()
plt.show()
visualize_samples(base_path, categories)
import os
from pathlib import Path
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
from PIL import Image
# Define the second dataset's base path
second_dataset_path = "/teamspace/studios/this_studio/final_folder_testing/cropped_processed"
# Categories and label mapping
categories = ['forehand', 'backhand', 'serve']
category_to_label = {category: idx for idx, category in enumerate(categories)}
# Load all image paths and labels from the second dataset
image_paths = []
labels = []
for category in categories:
folder_path = Path(second_dataset_path) / category
if not folder_path.exists():
print(f"Warning: {folder_path} does not exist. Skipping category {category}.")
continue
for file in folder_path.glob("*.jpeg"): # Match JPEG files
image_paths.append(str(file))
labels.append(category_to_label[category])
# Define the custom dataset class
class TennisDataset(torch.utils.data.Dataset):
def __init__(self, image_paths, labels, transform=None):
self.image_paths = image_paths
self.labels = labels
self.transform = transform
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
img_path = self.image_paths[idx]
label = self.labels[idx]
image = Image.open(img_path).convert("RGB")
if self.transform:
image = self.transform(image)
return image, label
# Define transformations for the second dataset
transform = transforms.Compose([
transforms.Resize((224, 224)), # Match the input size expected by the model
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # Same normalization as during training
])
# Create the second dataset and DataLoader
second_dataset = TennisDataset(image_paths, labels, transform)
second_loader = DataLoader(second_dataset, batch_size=32, shuffle=False)
# Load the trained model from the hardcoded path
model_save_path = "/teamspace/studios/this_studio/final_folder_testing/trained_tennis_model.pth"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Assuming you're using a ResNet18 model
from torchvision import models
import torch.nn as nn
# Recreate the model architecture and load weights
model = models.resnet18(pretrained=False)
model.fc = nn.Linear(model.fc.in_features, len(categories)) # Adjust output layer for categories
model.load_state_dict(torch.load(model_save_path, map_location=device))
model.to(device)
model.eval()
# Evaluate the model on the second dataset
second_test_labels = []
second_test_preds = []
with torch.no_grad():
for inputs, labels in second_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs, 1)
second_test_labels.extend(labels.cpu().numpy().tolist()) # Flatten labels
second_test_preds.extend(predicted.cpu().numpy().tolist()) # Flatten predictions
# Calculate test accuracy for the second dataset
correct = sum(1 for true, pred in zip(second_test_labels, second_test_preds) if true == pred)
total = len(second_test_labels)
test_accuracy = 100 * correct / total
print(f"Second Dataset Test Accuracy: {test_accuracy:.2f}%")
# Confusion matrix for the second dataset
second_test_cm = confusion_matrix(second_test_labels, second_test_preds)
# Heatmap visualization
sns.heatmap(second_test_cm, annot=True, fmt="d", xticklabels=categories, yticklabels=categories, cmap="coolwarm")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Second Dataset Confusion Matrix")
plt.show()
# Classification report for the second dataset
print("Second Dataset Classification Report")
print(classification_report(second_test_labels, second_test_preds, target_names=categories))
Second Dataset Test Accuracy: 25.29%
Second Dataset Classification Report
precision recall f1-score support
forehand 0.15 0.41 0.22 135
backhand 0.20 0.14 0.16 255
serve 0.98 0.29 0.45 215
accuracy 0.25 605
macro avg 0.44 0.28 0.28 605
weighted avg 0.47 0.25 0.28 605
Leider betrug die weighted accuracy nur 28 Prozent und lag damit sogar unter der zufälligen Trefferwahrscheinlichkeit. Gibt es eine Möglichkeit, die Prediction zu verbessern, ohne die Daten des zweiten Datensatzes für das Training zu verwenden? Daher wähle ich den Ansatz der Keypoint Detection, der im folgenden Abschnitt angewendet wird.
import os
import cv2
import mediapipe as mp
import matplotlib.pyplot as plt
# Paths to the images (ensure these paths exist and images are accessible)
img_paths = [
"/teamspace/studios/this_studio/final_folder_testing/cropped_original_pictures/original_pictures/backhand/B_001.jpeg", # Image 1
"/teamspace/studios/this_studio/final_folder_testing/cropped_processed/forehand/F_001_000.jpeg" # Image 2
]
# Initialize MediaPipe Pose model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
# Create a figure for displaying both images
fig, axes = plt.subplots(1, 2, figsize=(20, 10))
# Loop through each image
for i, img_path in enumerate(img_paths):
if not os.path.exists(img_path):
print(f"Image not found: {img_path}")
continue
# Read the image
image = cv2.imread(img_path)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Perform keypoint detection
results = pose.process(image_rgb)
# Create a copy of the image to overlay keypoints
image_with_keypoints = image.copy()
if results.pose_landmarks:
# Loop through each landmark and draw it on the image
for landmark in results.pose_landmarks.landmark:
# Convert normalized coordinates to pixel coordinates
height, width, _ = image.shape
x = int(landmark.x * width)
y = int(landmark.y * height)
# Draw the keypoint
cv2.circle(image_with_keypoints, (x, y), 5, (0, 255, 0), -1)
# Draw the connections between keypoints
mp.solutions.drawing_utils.draw_landmarks(image_with_keypoints, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
# Convert BGR to RGB for displaying with matplotlib
image_with_keypoints_rgb = cv2.cvtColor(image_with_keypoints, cv2.COLOR_BGR2RGB)
# Display the image with keypoints
axes[i].imshow(image_with_keypoints_rgb)
axes[i].axis('off')
axes[i].set_title(f"Image {i+1} with Keypoints")
plt.tight_layout()
plt.show()
2024-11-25 08:09:40.241518: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`. 2024-11-25 08:09:40.650878: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered WARNING: All log messages before absl::InitializeLog() is called are written to STDERR E0000 00:00:1732522180.812181 9814 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered E0000 00:00:1732522180.857255 9814 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered 2024-11-25 08:09:41.306645: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations. To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 AVX512_FP16 AVX_VNNI AMX_TILE AMX_INT8 AMX_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags. INFO: Created TensorFlow Lite XNNPACK delegate for CPU. WARNING: All log messages before absl::InitializeLog() is called are written to STDERR W0000 00:00:1732522185.911606 52387 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors. W0000 00:00:1732522185.953056 52387 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors. W0000 00:00:1732522185.983218 52385 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.
Als nächster Schritt wird mit MediaPipe Pose Keypoints auf Bildern in den Kategorien forehand, backhand und serve erkannt und die Koordinaten [x, y, z, visibility] jeder Erkennung in einer Log-Datei gespeichert. Bilder, bei denen keine Keypoints erkannt werden oder die nicht lesbar sind, werden ebenfalls mit entsprechenden Hinweisen protokolliert.
import os
import mediapipe as mp
import cv2
# Paths
input_path = "/teamspace/studios/this_studio/final_folder_testing/cropped_original_pictures/original_pictures"
output_log = "/teamspace/studios/this_studio/final_folder_testing/keypoints_log_original.txt"
# Initialize MediaPipe Pose model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
# Create a text file to log keypoints
with open(output_log, "w") as log_file:
# Loop through each category folder
for category in ["forehand", "backhand", "serve"]:
category_path = os.path.join(input_path, category)
if not os.path.exists(category_path):
print(f"Warning: Category folder {category_path} does not exist. Skipping...")
continue # Skip if category folder does not exist
# Loop through each image in the category
for img_name in os.listdir(category_path):
img_path = os.path.join(category_path, img_name)
if not img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
print(f"Skipping non-image file: {img_name}")
continue # Skip non-image files
try:
# Read the image
image = cv2.imread(img_path)
if image is None:
print(f"Error reading image: {img_path}. Skipping...")
log_file.write(f"Image: {img_path}\nError: Unable to read image.\n\n")
continue
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Perform keypoint detection
results = pose.process(image_rgb)
if results.pose_landmarks:
# Extract keypoints as a list of [x, y, z, visibility]
keypoints = [
[lm.x, lm.y, lm.z, lm.visibility]
for lm in results.pose_landmarks.landmark
]
# Write to log file
log_file.write(f"Image: {img_path}\n")
log_file.write(f"Keypoints detected (raw data): {keypoints}\n\n")
else:
log_file.write(f"Image: {img_path}\n")
log_file.write("Keypoints detected (raw data): []\n\n")
except Exception as e:
log_file.write(f"Image: {img_path}\nError: {e}\n\n")
print(f"Error processing {img_path}: {e}")
pose.close()
print(f"Keypoint detection completed. Results saved to {output_log}")
W0000 00:00:1732522191.816074 52518 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors. W0000 00:00:1732522191.849772 52517 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
Keypoint detection completed. Results saved to /teamspace/studios/this_studio/final_folder_testing/keypoints_log_original.txt
Beispiel eines Logs:¶
Beispiel für Keypoint-Daten (Landmarks)¶
Die folgenden Daten repräsentieren Landmarks, die von einem Pose-Estimationsmodell erkannt wurden. Jeder Landmark enthält 4 Werte:
x: Horizontale Koordinate (relativ zur Bildbreite, normalisiert zwischen 0–1).y: Vertikale Koordinate (relativ zur Bildhöhe, normalisiert zwischen 0–1).z: Tiefenkoordinate (relativ zur Kamera, normalisiert).visibility: Zuverlässigkeitswert (0–1), der angibt, wie wahrscheinlich derLandmarksichtbar ist.
Beispieldaten:¶
plaintext
[
[0.5, 0.6, -0.1, 0.98], # Nase (`x`, `y`, `z`, `visibility`)
[0.48, 0.55, -0.1, 0.95], # Linkes Auge
[0.52, 0.55, -0.1, 0.96], # Rechtes Auge
...
[0.4, 0.8, 0.1, 0.85] # Linker Knöchel
]
Trainieren und Testen anhand der Keypoints aus der Log-Datei¶
In diesem Teil werden Keypoint-Daten aus der zuvor erstellten Log-Datei geladen, zu Tensoren verarbeitet und ein Multi-Layer Perceptron (MLP) trainiert, um Tennisschläge (forehand, backhand, serve) zu klassifizieren. Die Trainings- und Validierungsverluste über die Epochen werden visualisiert und die Modellgenauigkeit auf einem separaten Testdatensatz bewertet.
import re
import torch
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from ast import literal_eval
# Base path for the project
base_path = "/teamspace/studios/this_studio/final_folder_testing"
# Log file path
log_file = f"{base_path}/keypoints_log_original.txt"
# Define a dictionary to map shot types to labels
shot_type_to_label = {"forehand": 0, "backhand": 1, "serve": 2}
# Initialize lists to store keypoints and labels
keypoints_data = []
labels = []
# Counters for skipped and valid entries
skipped_count = 0
valid_count = 0
# Define a regex pattern to extract shot type from file path
shot_type_pattern = re.compile(r"(forehand|backhand|serve)")
# Expected keypoints length (33 landmarks * 4 values per landmark)
expected_keypoints_length = 33 * 4
# Parse the log file
with open(log_file, "r") as f:
lines = f.readlines()
for line in lines:
# Find the image path and determine shot type
if line.startswith("Image:"):
match = shot_type_pattern.search(line)
if match:
current_label = shot_type_to_label[match.group(0)]
# Find keypoints data
elif line.startswith("Keypoints detected (raw data):"):
keypoints_str = line.split(":")[1].strip()
keypoints_list = literal_eval(keypoints_str) # Safer alternative to eval()
flattened_keypoints = [value for kp in keypoints_list for value in kp] # Flatten keypoints
# Ensure keypoints have the correct length before appending
if len(flattened_keypoints) == expected_keypoints_length:
keypoints_data.append(flattened_keypoints)
labels.append(current_label)
valid_count += 1 # Increment valid count
else:
skipped_count += 1 # Increment skipped count
# Print the counts for skipped and valid entries
print(f"Valid entries processed: {valid_count}")
print(f"Skipped entries: {skipped_count}")
# Convert to tensors
X = torch.tensor(keypoints_data, dtype=torch.float32)
y = torch.tensor(labels, dtype=torch.long)
# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Create DataLoaders for PyTorch
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# Check if CUDA is available and select device accordingly
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
# Define an MLP model for classification
class KeypointClassifier(nn.Module):
def __init__(self, input_dim, num_classes):
super(KeypointClassifier, self).__init__()
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, num_classes)
self.dropout = nn.Dropout(0.5)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.dropout(x)
x = torch.relu(self.fc2(x))
return self.fc3(x)
# Set up the model
input_dim = X_train.shape[1]
num_classes = len(shot_type_to_label)
model = KeypointClassifier(input_dim, num_classes).to(device)
# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
epochs = 20
train_losses = []
val_losses = []
for epoch in range(epochs):
model.train()
running_loss = 0.0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
running_loss += loss.item()
train_losses.append(running_loss / len(train_loader))
# Validation phase
model.eval()
val_loss = 0.0
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
val_loss += loss.item()
val_loss /= len(test_loader)
val_losses.append(val_loss)
print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_losses[-1]:.4f}, Validation Loss: {val_loss:.4f}")
# Evaluate the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
_, predicted = torch.max(outputs, 1)
total += y_batch.size(0)
correct += (predicted == y_batch).sum().item()
accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")
# Plot training and validation loss
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label="Training Loss")
plt.plot(val_losses, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.title("Training and Validation Loss Over Epochs")
plt.show()
Valid entries processed: 1370 Skipped entries: 130 Using device: cpu Epoch 1/20, Train Loss: 1.0031, Validation Loss: 0.7909 Epoch 2/20, Train Loss: 0.7056, Validation Loss: 0.5413 Epoch 3/20, Train Loss: 0.5571, Validation Loss: 0.4576 Epoch 4/20, Train Loss: 0.4765, Validation Loss: 0.4111 Epoch 5/20, Train Loss: 0.4202, Validation Loss: 0.3676 Epoch 6/20, Train Loss: 0.3899, Validation Loss: 0.3646 Epoch 7/20, Train Loss: 0.3555, Validation Loss: 0.3701
Epoch 8/20, Train Loss: 0.3731, Validation Loss: 0.3680 Epoch 9/20, Train Loss: 0.3648, Validation Loss: 0.3385 Epoch 10/20, Train Loss: 0.3389, Validation Loss: 0.3109 Epoch 11/20, Train Loss: 0.3243, Validation Loss: 0.3345 Epoch 12/20, Train Loss: 0.3204, Validation Loss: 0.2952 Epoch 13/20, Train Loss: 0.3066, Validation Loss: 0.2925 Epoch 14/20, Train Loss: 0.3133, Validation Loss: 0.2883 Epoch 15/20, Train Loss: 0.2848, Validation Loss: 0.2797 Epoch 16/20, Train Loss: 0.2920, Validation Loss: 0.2636 Epoch 17/20, Train Loss: 0.2820, Validation Loss: 0.3113 Epoch 18/20, Train Loss: 0.2827, Validation Loss: 0.2539 Epoch 19/20, Train Loss: 0.2611, Validation Loss: 0.2737 Epoch 20/20, Train Loss: 0.2661, Validation Loss: 0.2466 Test Accuracy: 90.51%
87% als Test Accuracy ist zwar schlechter als das ResNet-Modell, aber wir werden sehen, wie gut es auf das zweite Dataset angewendet werden kann.
Anwenden des trainierten Modells am zweiten Datensatz¶
Nachfolgend wird nochmals MediaPipe Pose verwendet, um Keypoints aus den zugeschnittenen Bildern in den Kategorien forehand, backhand und serve zu extrahieren und in einer Log-Datei zu speichern. Dabei werden jetzt die neuen, selbst erstellten Bilder verwendet, damit das trainierte Modell auf dem zweiten Datensatz getestet werden kann.
import os
import mediapipe as mp
import cv2
# Updated Paths
input_path = "/teamspace/studios/this_studio/final_folder_testing/cropped_processed"
output_log = "/teamspace/studios/this_studio/final_folder_testing/processed_keypoints_log.txt"
# Initialize MediaPipe Pose model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
# Create a text file to log keypoints
with open(output_log, "w") as log_file:
# Loop through each category folder
for category in ["forehand", "backhand", "serve"]:
category_path = os.path.join(input_path, category)
if not os.path.exists(category_path):
print(f"Warning: Category folder {category_path} does not exist. Skipping...")
continue # Skip if category folder does not exist
# Loop through each image in the category
for img_name in os.listdir(category_path):
img_path = os.path.join(category_path, img_name)
if not img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
print(f"Skipping non-image file: {img_name}")
continue # Skip non-image files
try:
# Read the image
image = cv2.imread(img_path)
if image is None:
print(f"Error reading image: {img_path}. Skipping...")
log_file.write(f"Image: {img_path}\nError: Unable to read image.\n\n")
continue
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Perform keypoint detection
results = pose.process(image_rgb)
if results.pose_landmarks:
# Extract keypoints as a list of [x, y, z, visibility]
keypoints = [
[lm.x, lm.y, lm.z, lm.visibility]
for lm in results.pose_landmarks.landmark
]
# Write to log file
log_file.write(f"Image: {img_path}\n")
log_file.write(f"Keypoints detected (raw data): {keypoints}\n\n")
else:
log_file.write(f"Image: {img_path}\n")
log_file.write("Keypoints detected (raw data): []\n\n")
except Exception as e:
print(f"Error processing {img_path}: {e}")
log_file.write(f"Image: {img_path}\nError: {e}\n\n")
print(f"Keypoint detection completed. Results saved to {output_log}")
W0000 00:00:1732522234.091883 53286 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors. W0000 00:00:1732522234.139160 53286 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
Keypoint detection completed. Results saved to /teamspace/studios/this_studio/final_folder_testing/processed_keypoints_log.txt
Anwendung des trainierten Modells auf den zweiten Testdatensatz¶
Das trainierte Klassifikationsmodell vom ersten Testset mit den Keypoints aus der Logdatei wird auf einen zweiten Testdatensatz angewendet. Dabei werden vorverarbeitete Keypoint-Daten (33 Landmarks mit 4 Werten) geladen und mit den Labels (forehand, backhand, serve) verglichen.
import re
import torch
from torch.utils.data import DataLoader, TensorDataset
from ast import literal_eval
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
# Path to the processed keypoints log file
test_log_file = "/teamspace/studios/this_studio/final_folder_testing/processed_keypoints_log.txt"
# Define a dictionary to map shot types to labels
shot_type_to_label = {"forehand": 0, "backhand": 1, "serve": 2}
# Initialize lists to store keypoints and labels for testing
test_keypoints_data = []
test_labels = []
# Define a regex pattern to extract shot type from file path
shot_type_pattern = re.compile(r"(forehand|backhand|serve)")
# Expected keypoints length (33 landmarks * 4 values per landmark)
expected_keypoints_length = 33 * 4
# Parse the test log file
with open(test_log_file, "r") as f:
lines = f.readlines()
skipped_count = 0
valid_count = 0
for line in lines:
# Find the image path and determine shot type
if line.startswith("Image:"):
match = shot_type_pattern.search(line)
if match:
current_label = shot_type_to_label[match.group(0)]
# Find keypoints data
elif line.startswith("Keypoints detected (raw data):"):
keypoints_str = line.split(":")[1].strip()
keypoints_list = literal_eval(keypoints_str) # Safer alternative to eval()
flattened_keypoints = [value for kp in keypoints_list for value in kp] # Flatten keypoints
# Ensure keypoints have the correct length before appending
if len(flattened_keypoints) == expected_keypoints_length:
test_keypoints_data.append(flattened_keypoints)
test_labels.append(current_label)
valid_count += 1
else:
skipped_count += 1
# Print the counts for skipped and valid entries
print(f"Testing: Valid entries processed: {valid_count}, Skipped entries: {skipped_count}")
# Convert to tensors
X_test = torch.tensor(test_keypoints_data, dtype=torch.float32)
y_test = torch.tensor(test_labels, dtype=torch.long)
# Create DataLoader for PyTorch
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# Ensure your model is on the correct device
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
# Evaluate the model on the test data
model.eval()
correct = 0
total = 0
all_preds = []
all_labels = []
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
_, predicted = torch.max(outputs, 1)
total += y_batch.size(0)
correct += (predicted == y_batch).sum().item()
all_preds.extend(predicted.cpu().numpy())
all_labels.extend(y_batch.cpu().numpy())
accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")
# Plot confusion matrix
cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=list(shot_type_to_label.keys()))
disp.plot(cmap="coolwarm")
plt.title("Confusion Matrix for Test Set")
plt.show()
Testing: Valid entries processed: 554, Skipped entries: 51 Test Accuracy: 86.82%
from sklearn.metrics import f1_score
# Calculate F1 score
f1 = f1_score(all_labels, all_preds, average='weighted') # Use 'weighted' for imbalanced datasets
print(f"F1 Score (Weighted): {f1:.2f}")
F1 Score (Weighted): 0.87
Der Vergleich zwischen den beiden Datensätzen hat sehr gut funktioniert! Die Accuracy konnte von 25 % auf 87 % erhöht werden. Die Schläge wurden mithilfe des trainierten Modells vom ersten Keypoint-Textfile klassifiziert und erfolgreich auf die ungesehenen Daten des zweiten Datensatzes angewendet – ein grosser Erfolg!
Gesamtanalyse der Ergebnisse¶
Das Modell ResNet-18 erzielte hervorragende Ergebnisse, wenn es auf ein einziges Dataset trainiert und getestet wurde, mit einer Accuracy von 86 %. Allerdings verschlechterte sich die Leistung drastisch, sobald ein zweites Dataset für Tests hinzugefügt wurde, mit einer Leistung schlechter als zufällige Chance. Eine mögliche Lösung besteht darin, noch mehr Bilder hinzuzufügen und die Datasets gemeinsam zu trainieren, um die Generalisierung auf ungesehene Daten zu verbessern, wobei die Erhebung von weiteren Daten für weitere Datensätze zu intensiv gewesen wäre im Rahmen dieses Projekts.
Die Keypoint-basierte Methode zeigte eine starke Performance und erreichte eine Accuracy von 86 % zwischen zwei Datasets. Beeindruckend ist, dass nur Keypoint-Informationen aus Textdateien verwendet wurden. Die Ergebnisse zeigen, dass die Keypoint-Methode grosses Potenzial für Generalisierungsprobleme aufweist, während ResNet-18 durch die Integration zusätzlicher Daten weiter optimiert werden könnte.